package flinkstudy.stream.source;

import flinkstudy.bo.EstateInfo;
import flinkstudy.stream.source.mock.DynamicUnlimitedData;
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import javax.annotation.Nullable;
import java.util.List;

/**
 * 操作数据源
 *
 * @author daocr
 * @date 2019/12/14
 */
public class DataStreamSourceTest extends BaseTest {


    DynamicUnlimitedData dynamicUnlimitedData = null;

    @Before
    public void init() {
        dynamicUnlimitedData = new DynamicUnlimitedData();
        super.init();
    }


    /**
     * 数据转换
     */
    @Test
    public void map() throws Exception {

        DataStreamSource<EstateInfo> estateInfoDataStreamSource = env.addSource(dynamicUnlimitedData);

        estateInfoDataStreamSource.map(e -> {
            e.setCreateTimeFormat(e.getCreateTimeFormat() + "---");
            return e;
        }).print();

        env.execute();

    }


    @Test
    public void coGroup() throws Exception {

        SingleOutputStreamOperator<Tuple2<Long, String>> source1 = env.fromElements(
                Tuple2.of(1L, "xiaoming"),
                Tuple2.of(2L, "xiaowang"))
//                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksImpl())
                ;

        SingleOutputStreamOperator<Tuple2<Long, String>> source2 = env.fromElements(
                Tuple2.of(2L, "xiaoli"),
                Tuple2.of(1L, "shinelon"),
                Tuple2.of(3L, "hhhhhh"))
//                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksImpl())
                ;



        source1.coGroup(source2).where(new KeySelector<Tuple2<Long, String>, Long>() {
            @Override
            public Long getKey(Tuple2<Long, String> value) throws Exception {
                return value.f0;
            }
        }).equalTo(new KeySelector<Tuple2<Long, String>, Long>() {
            @Override
            public Long getKey(Tuple2<Long, String> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<Long, String>, Tuple2<Long, String>, List<Tuple2>>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<Long, String>> first, Iterable<Tuple2<Long, String>> second, Collector<List<Tuple2>> out) throws Exception {
                        System.out.println("");
                    }
                });

        env.execute();

    }

    private static class AssignerWithPeriodicWatermarksImpl implements AssignerWithPeriodicWatermarks {


        private Long time = null;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(time);
        }

        @SneakyThrows
        @Override
        public long extractTimestamp(Object element, long previousElementTimestamp) {
            Thread.sleep(1000);
            time = System.currentTimeMillis();
            return time;
        }
    }


}
